api-reference-examples/python-notebook/ThreatExchange Data Dashboard.ipynb (479 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# ThreatExchange Data Dashboard\n", "\n", "**Purpose**\n", " \n", "The ThreatExchange APIs are designed to make consuming threat intelligence from multiple sources easy. This notebook will walk you through:\n", "\n", " - building an initial dashboard for assessing the data visible to your appID;\n", " - filtering down to a subset you consider *high value*; and\n", " - exporting the high value data to a file.\n", "\n", "**What you need**\n", "\n", "Before getting started, you'll need a few Python packages installed:\n", "\n", " - [Pandas](http://pandas.pydata.org/) for data manipulation and analysis\n", " - [Pytx](https://pytx.readthedocs.org/en/latest/installation.html) for ThreatExchange access\n", " - [Seaborn](https://stanford.edu/~mwaskom/software/seaborn/) for making charts pretty\n", "\n", "All of the python packages mentioned can be installed via \n", "\n", "```\n", "pip install <package_name>\n", "```\n", "\n", "### Setup a ThreatExchange `access_token`\n", "\n", "If you don't already have an `access_token` for your app, use the [Facebook Access Token Tool]( https://developers.facebook.com/tools/accesstoken/) to get one." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from pytx.access_token import access_token\n", "from pytx.logger import setup_logger\n", "from pytx.vocabulary import PrivacyType as pt\n", "\n", "# Specify the location of your token via one of several ways:\n", "# https://pytx.readthedocs.org/en/latest/pytx.access_token.html\n", "access_token()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Optionally, enable debug level logging" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Uncomment this if you want debug logging enabled\n", "#setup_logger(log_file=\"pytx.log\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Search for data in ThreatExchange\n", "\n", "Start by running a query against the ThreatExchange APIs to pull down any/all data relevant to you over a specified period of days." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Our basic search parameters, we default to querying over the past 14 days\n", "days_back = 14\n", "search_terms = ['abuse', 'phishing', 'malware', 'exploit', 'apt', 'ddos', 'brute', 'scan', 'cve']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we execute the query using our search parameters and put the results in a Pandas `DataFrame`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from datetime import datetime, timedelta\n", "from time import strftime\n", "import pandas as pd\n", "import re\n", "\n", "from pytx import ThreatDescriptor\n", "from pytx.vocabulary import ThreatExchange as te\n", "\n", "# Define your search string and other params, see \n", "# https://pytx.readthedocs.org/en/latest/pytx.common.html#pytx.common.Common.objects\n", "# for the full list of options\n", "search_params = {\n", " te.FIELDS: ThreatDescriptor._default_fields,\n", " te.LIMIT: 1000,\n", " te.SINCE: strftime('%Y-%m-%d %H:%m:%S +0000', (datetime.utcnow() + timedelta(days=(-1*days_back))).timetuple()),\n", " te.TEXT: search_terms,\n", " te.UNTIL: strftime('%Y-%m-%d %H:%m:%S +0000', datetime.utcnow().timetuple()),\n", " te.STRICT_TEXT: False\n", "}\n", "\n", "data_frame = None\n", "for search_term in search_terms:\n", " print \"Searching for '%s' over -%d days\" % (search_term, days_back)\n", " results = ThreatDescriptor.objects(\n", " fields=search_params[te.FIELDS],\n", " limit=search_params[te.LIMIT],\n", " text=search_term, \n", " since=search_params[te.SINCE], \n", " until=search_params[te.UNTIL],\n", " strict_text=search_params[te.STRICT_TEXT]\n", " )\n", " tmp = pd.DataFrame([result.to_dict() for result in results])\n", " tmp['search_term'] = search_term\n", " print \"\\t... found %d descriptors\" % tmp.size\n", " if data_frame is None:\n", " data_frame = tmp\n", " else:\n", " data_frame = data_frame.append(tmp)\n", " \n", "print \"\\nFound %d descriptors in total.\" % data_frame.size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Do some data munging for easier analysis and then preview as a sanity check" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from time import mktime\n", "\n", "# Extract a datetime and timestamp, for easier analysis\n", "data_frame['ds'] = pd.to_datetime(data_frame.added_on.str[0:10], format='%Y-%m-%d')\n", "data_frame['ts'] = pd.to_datetime(data_frame.added_on)\n", "\n", "# Extract the owner data\n", "owner = data_frame.pop('owner')\n", "owner = owner.apply(pd.Series)\n", "data_frame = pd.concat([data_frame, owner.email, owner.name], axis=1)\n", "\n", "# Extract freeform 'tags' in the description\n", "def extract_tags(text):\n", " return re.findall(r'\\[([a-zA-Z0-9\\:\\-\\_]+)\\]', text)\n", "data_frame['tags'] = data_frame.description.map(lambda x: [] if x is None else extract_tags(x))\n", "\n", "data_frame.head(n=5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Dashboard to Get a High-level View\n", "\n", "The raw data is great, but it would be much better if we could take a higher level view of the data. This dashboard will provide more insight into:\n", "\n", " - what data is available\n", " - who's sharing it\n", " - how is labeled\n", " - how much of it is likely to be directly applicable for alerting" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import math\n", "import matplotlib.pyplot as plt\n", "import seaborn as sns\n", "\n", "from pytx.vocabulary import ThreatDescriptor as td\n", "\n", "%matplotlib inline\n", "\n", "# Setup subplots for our dashboard\n", "fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(16,32))\n", "axes[0,0].set_color_cycle(sns.color_palette(\"coolwarm_r\", 15))\n", "\n", "# Plot by Type over time\n", "type_over_time = data_frame.groupby(\n", " [pd.Grouper(freq='d', key='ds'), te.TYPE]\n", " ).count().unstack(te.TYPE)\n", "type_over_time.added_on.plot(\n", " kind='line', \n", " stacked=True, \n", " title=\"Indicator Types Per Day (-\" + str(days_back) + \"d)\",\n", " ax=axes[0,0]\n", ")\n", "\n", "# Plot by threat_type over time\n", "tt_over_time = data_frame.groupby(\n", " [pd.Grouper(freq='w', key='ds'), 'threat_type']\n", " ).count().unstack('threat_type')\n", "tt_over_time.added_on.plot(\n", " kind='bar', \n", " stacked=True, \n", " title=\"Threat Types Per Week (-\" + str(days_back) + \"d)\",\n", " ax=axes[0,1]\n", ")\n", "\n", "# Plot the top 10 tags\n", "tags = pd.DataFrame([item for sublist in data_frame.tags for item in sublist])\n", "tags[0].value_counts().head(10).plot(\n", " kind='bar', \n", " stacked=True,\n", " title=\"Top 10 Tags (-\" + str(days_back) + \"d)\",\n", " ax=axes[1,0]\n", ")\n", "\n", "# Plot by who is sharing\n", "owner_over_time = data_frame.groupby(\n", " [pd.Grouper(freq='w', key='ds'), 'name']\n", " ).count().unstack('name')\n", "owner_over_time.added_on.plot(\n", " kind='bar', \n", " stacked=True, \n", " title=\"Who's Sharing Each Week? (-\" + str(days_back) + \"d)\",\n", " ax=axes[1,1]\n", ")\n", "\n", "# Plot the data as a timeseries of when it was published\n", "data_over_time = data_frame.groupby(pd.Grouper(freq='6H', key='ts')).count()\n", "data_over_time.added_on.plot(\n", " kind='line',\n", " title=\"Data shared over time (-\" + str(days_back) + \"d)\",\n", " ax=axes[2,0]\n", ")\n", "\n", "# Plot by status label\n", "data_frame.status.value_counts().plot(\n", " kind='pie', \n", " title=\"Threat Statuses (-\" + str(days_back) + \"d)\",\n", " ax=axes[2,1]\n", ")\n", "\n", "# Heatmap by type / source\n", "owner_and_type = pd.DataFrame(data_frame[['name', 'type']])\n", "owner_and_type['n'] = 1\n", "grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)\n", "ax = sns.heatmap(\n", " data=grouped['n'], \n", " robust=True,\n", " cmap=\"YlGnBu\",\n", " ax=axes[3,0]\n", ")\n", "\n", "# These require a little data munging\n", "# translate a severity enum to a value\n", "# TODO Add this translation to Pytx\n", "def severity_value(severity):\n", " if severity == 'UNKNOWN': return 0\n", " elif severity == 'INFO': return 1\n", " elif severity == 'WARNING': return 3\n", " elif severity == 'SUSPICIOUS': return 5\n", " elif severity == 'SEVERE': return 7\n", " elif severity == 'APOCALYPSE': return 10\n", " return 0\n", "# translate a severity \n", "def value_severity(severity):\n", " if severity >= 9: return 'APOCALYPSE'\n", " elif severity >= 6: return 'SEVERE'\n", " elif severity >= 4: return 'SUSPICIOUS'\n", " elif severity >= 2: return 'WARNING'\n", " elif severity >= 1: return 'INFO'\n", " elif severity >= 0: return 'UNKNOWN'\n", "\n", "# Plot by how actionable the data is \n", "# Build a special dataframe and chart it\n", "data_frame['severity_value'] = data_frame.severity.apply(severity_value)\n", "df2 = pd.DataFrame({'count' : data_frame.groupby(['name', 'confidence', 'severity_value']).size()}).reset_index()\n", "ax = df2.plot(\n", " kind='scatter', \n", " x='severity_value', y='confidence', \n", " xlim=(-1,11), ylim=(-10,110), \n", " title='Data by Conf / Sev With Threshold Line',\n", " ax=axes[3,1],\n", " s=df2['count'].apply(lambda x: 1000 * math.log10(x)),\n", " use_index=td.SEVERITY\n", ")\n", "# Draw a threshhold for data we consider likely using for alerts (aka 'high value')\n", "ax.plot([2,10], [100,0], c='red')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dive A Little Deeper\n", "\n", "Take a subset of the data and understand it a little more. \n", "\n", "In this example, we presume that we'd like to take phishing related data and study it, to see if we can use it to better defend a corporate network or abuse in a product. \n", "\n", "As a simple example, we'll filter down to data labeled **`MALICIOUS`** and the word **`phish`** in the description, to see if we can make a more detailed conclusion on how to apply the data to our existing internal workflows." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from pytx.vocabulary import Status as s\n", "\n", "\n", "phish_data = data_frame[(data_frame.status == s.MALICIOUS) \n", " & data_frame.description.apply(lambda x: x.find('phish') if x != None else False)]\n", "# TODO: also filter for attack_type == PHISHING, when Pytx supports it\n", "\n", "%matplotlib inline\n", "\n", "# Setup subplots for our deeper dive plots\n", "fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16,8))\n", "\n", "# Heatmap of type / source\n", "owner_and_type = pd.DataFrame(phish_data[['name', 'type']])\n", "owner_and_type['n'] = 1\n", "grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)\n", "ax = sns.heatmap(\n", " data=grouped['n'], \n", " robust=True,\n", " cmap=\"YlGnBu\",\n", " ax=axes[0]\n", ")\n", "\n", "# Tag breakdown of the top 10 tags\n", "tags = pd.DataFrame([item for sublist in phish_data.tags for item in sublist])\n", "tags[0].value_counts().head(10).plot(\n", " kind='pie',\n", " title=\"Top 10 Tags (-\" + str(days_back) + \"d)\",\n", " ax=axes[1]\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Extract The High Confidence / Severity Data For Use\n", "\n", "With a better understanding of the data, let's filter the **`MALICIOUS`**, **`REVIEWED_MANUALLY`** labeled data down to a pre-determined threshold for confidence + severity. \n", "\n", "You can add more filters, or change the threshold, as you see fit." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from pytx.vocabulary import ReviewStatus as rs\n", "\n", "# define our threshold line, which is the same as the red, threshold line in the chart above\n", "sev_min = 2\n", "sev_max = 10\n", "conf_min= 0\n", "conf_max = 100\n", "\n", "# build a new series, to indicate if a row passes our confidence + severity threshold\n", "def is_high_value(conf, sev):\n", " return (((sev_max - sev_min) * (conf - conf_max)) - ((conf_min - conf_max) * (sev - sev_min))) > 0\n", "data_frame['is_high_value']= data_frame.apply(lambda x: is_high_value(x.confidence, x.severity_value), axis=1)\n", "\n", "# filter down to just the data passing our criteria, you can add more here to filter by type, source, etc.\n", "high_value_data = data_frame[data_frame.is_high_value \n", " & (data_frame.status == s.MALICIOUS)\n", " & (data_frame.review_status == rs.REVIEWED_MANUALLY)].reset_index(drop=True)\n", "\n", "# get a count of how much we kept\n", "print \"Kept %d of %d data as high value\" % (high_value_data.size, data_frame.size)\n", "\n", "# ... and preview it\n", "high_value_data.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, output all of the high value data to a file as CSV or JSON, for consumption in our other systems and workflows." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "use_csv = False\n", "\n", "if use_csv:\n", " file_name = 'threat_exchange_high_value.csv'\n", " high_value_data.to_csv(path_or_buf=file_name)\n", " print \"CSV data written to %s\" % file_name\n", "else:\n", " file_name = 'threat_exchange_high_value.json'\n", " high_value_data.to_json(path_or_buf=file_name, orient='index')\n", " print \"JSON data written to %s\" % file_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }